package flinkstudy.batch;

import lombok.Data;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
 * @author daocr
 * @date 2020/6/11
 */
public class Test {

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(env);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(20)));

        List<People> mockData = mockData();

        DataSource<People> peopleDataSource = env.fromCollection(mockData);


        // 城市分组
        gorup(peopleDataSource, new KeySelector<People, String>() {
            @Override
            public String getKey(People value) throws Exception {
                Integer cityId1 = value.getCityId();
//                String yyyyMMdd = value.getBirthday().format(DateTimeFormatter.ISO_LOCAL_DATE);
                return cityId1 + "";
            }
        }, new RichGroupReduceFunction<People, Tuple2<People, Long>>() {
            @Override
            public void reduce(Iterable<People> values, Collector<Tuple2<People, Long>> out) throws Exception {

                List<People> collect = StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList());
                long count = collect.size();
                People people = collect.get(0);
                out.collect(Tuple2.of(people, count));

            }
        }, "城市");


        // 城市分组
        gorup(peopleDataSource, new KeySelector<People, String>() {
            @Override
            public String getKey(People value) throws Exception {
                Integer areaId = value.getAreaId();
//                String yyyyMMdd = value.getBirthday().format(DateTimeFormatter.ISO_LOCAL_DATE);
                return areaId + "";
            }
        }, new RichGroupReduceFunction<People, Tuple2<People, Long>>() {
            @Override
            public void reduce(Iterable<People> values, Collector<Tuple2<People, Long>> out) throws Exception {
                List<People> collect = StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList());
                long count = collect.size();
                People people = collect.get(0);
                out.collect(Tuple2.of(people, count));

            }
        }, "区域");


        System.out.println(env.getExecutionPlan());
        env.execute();


    }

    private static void gorup(DataSource<People> peopleDataSource, KeySelector<People, String> keySelector, RichGroupReduceFunction<People, Tuple2<People, Long>> richGroupReduceFunction, String
            type) {
        UnsortedGrouping<People> peopleUnsortedGrouping = peopleDataSource.groupBy(keySelector);

        GroupReduceOperator<People, Tuple2<People, Long>> peopleTuple2GroupReduceOperator = peopleUnsortedGrouping.reduceGroup(richGroupReduceFunction);

        peopleTuple2GroupReduceOperator.output(new OutputFormat<Tuple2<People, Long>>() {
            @Override
            public void configure(Configuration parameters) {

            }

            @Override
            public void open(int taskNumber, int numTasks) throws IOException {

            }

            @Override
            public void writeRecord(Tuple2<People, Long> record) throws IOException {
                System.out.println(type + "   " + record);
            }

            @Override
            public void close() throws IOException {

            }
        });

    }

    /**
     * 模拟数据
     *
     * @return
     */
    private static List<People> mockData() {
        List<People> mockData = new ArrayList<>();

        ThreadLocalRandom current = ThreadLocalRandom.current();

        for (int i = 0; i < 1000; i++) {

            People people = new People();
            people.setId(i);
            people.setCityId(current.nextInt(605, 610));
            people.setAreaId(current.nextInt(10, 30));
            LocalDate of = LocalDate.of(2020, current.nextInt(1, 3), current.nextInt(1, 26));
            people.setBirthday(of);
            people.setRoomNum(current.nextInt(1, 6));
            mockData.add(people);
        }
        return mockData;
    }


    @Data
    public static class People {
        private Integer id;
        private Integer cityId;
        private Integer areaId;
        private LocalDate birthday;
        private Integer roomNum;
    }

}
